{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Counting Museums\n",
    "\n",
    "This chapter will introduce some of the aggregation functions available to us in Spark SQL. We are going to learn to do the following in this chapter:\n",
    "\n",
    "**Count the number of museums in Zurich by performing a spatial join between the Cultural Points of Interests and City Boundaries.**\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Populating the interactive namespace from numpy and matplotlib\n"
     ]
    }
   ],
   "source": [
    "# %load './code/helpers/imports.py'\n",
    "import notebook\n",
    "import os.path, json, io, pandas\n",
    "import matplotlib.pyplot as plt\n",
    "%pylab inline\n",
    "pylab.rcParams['figure.figsize'] = (54, 60)\n",
    "\n",
    "\n",
    "from retrying import retry # for exponential back down when calling TurboOverdrive API\n",
    "\n",
    "import pyspark.sql.functions as func # resuse as func.coalace for example\n",
    "from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType,DecimalType\n",
    "\n",
    "import pandas as pandas\n",
    "from geopandas import GeoDataFrame # Loading boundaries Data\n",
    "from shapely.geometry import Point, Polygon, shape # creating geospatial data\n",
    "from shapely import wkb, wkt # creating and parsing geospatial data\n",
    "import overpy # OpenStreetMap API\n",
    "\n",
    "from ast import literal_eval as make_tuple # used to decode data from java\n",
    "\n",
    "# make sure nbextensions are installed\n",
    "notebook.nbextensions.check_nbextension('usability/codefolding', user=True)\n",
    "\n",
    "try:\n",
    "    sc\n",
    "except NameError:\n",
    "    import pyspark\n",
    "    sc = pyspark.SparkContext('local[*]')\n",
    "    sqlContext = pyspark.sql.SQLContext(sc)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# %load './code/helpers/load_boundaries_and_pois.py'\n",
    "OVERPASS_API         = overpy.Overpass()\n",
    "BASE_DIR             = os.path.join(os.path.abspath('.'), 'work-flow')\n",
    "URBAN_BOUNDARIES_FILE = '06_Europe_Cities_Boundaries_with_Labels_Population.geo.json'\n",
    "\n",
    "# Paths to base datasets that we are using:\n",
    "URBAN_BOUNDARIES_PATH = os.path.join(BASE_DIR,URBAN_BOUNDARIES_FILE)\n",
    "POIS_PATH            = os.path.join(BASE_DIR, \"pois.json\")\n",
    "\n",
    "try:\n",
    "    geo_df\n",
    "except NameError:\n",
    "    geo_df = GeoDataFrame.from_file(URBAN_BOUNDARIES_PATH)\n",
    "    # Add a WKT column for use later\n",
    "    geo_df['wkt'] = pandas.Series(\n",
    "        map(lambda geom: str(geom.to_wkt()), geo_df['geometry']),\n",
    "        index=geo_df.index, dtype='string')\n",
    "\n",
    "try:\n",
    "    boundaries_from_pd\n",
    "except NameError:\n",
    "    boundaries_from_pd = sqlContext.createDataFrame(geo_df)\n",
    "    # boundaries_from_pd.createOrReplaceTempView(\"boundaries\")\n",
    "    boundaries_from_pd.registerTempTable(\"boundaries\")\n",
    "\n",
    "try:\n",
    "    pois_df\n",
    "except NameError:\n",
    "    pois_df = sqlContext.read.json(POIS_PATH)\n",
    "    pois_df = pois_df.toPandas()\n",
    "    def toWktColumn(coords):\n",
    "        return (Point(coords).wkt)\n",
    "\n",
    "    pois_df['wkt'] = pandas.Series(\n",
    "        map(lambda geom: toWktColumn(geom.coordinates), pois_df['geometry']),\n",
    "        index=pois_df.index, dtype='string')\n",
    "\n",
    "    pois_df = sqlContext.createDataFrame(pois_df)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Count the number of museums in Zurich\n",
    "\n",
    "So give what we know about our final goal of calculating the cultural score for each city we can start with smaller steps of running our analysis on a single city and for single type of location. So lets start with museums in Zurich.\n",
    "\n",
    "## Exercise\n",
    "\n",
    "* Count the number of museums in Zurich\n",
    "HINT: You will need Shapely Spatial Operations"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# --------------------- Solution Below -----------------------------#\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Solution\n",
    "\n",
    "Our approach to finding the count of museums will be as follows:\n",
    "\n",
    "1. Get all museums from POIs as Spark Dataframe\n",
    "2. Get boundary of Zurich as Spark Dataframe\n",
    "3. Get the Shapely Polygon Object for the Zurich boundary from the WKT string of the boundary geometry\n",
    "4. Filter each POI by checking if it's coordinates (as Shapely Point Object) is within the boundary (as Shapely Polygon Object)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1358\n"
     ]
    },
    {
     "data": {
      "image/svg+xml": [
       "<svg xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\" width=\"100.0\" height=\"100.0\" viewBox=\"8.44096254054 47.3131580402 0.190907821406 0.128784319976\" preserveAspectRatio=\"xMinYMin meet\"><g transform=\"matrix(1,0,0,-1,0,94.7551004004)\"><g><path fill-rule=\"evenodd\" fill=\"#66cc99\" stroke=\"#555555\" stroke-width=\"0.00381815642811\" opacity=\"0.6\" d=\"M 8.51073070064,47.4321856996 L 8.5246681998,47.4311641999 L 8.52792424989,47.4326966806 L 8.54355820194,47.4313432 L 8.56083770048,47.4184912002 L 8.56327356287,47.4156632294 L 8.57378618013,47.4113305375 L 8.58770695385,47.407749793 L 8.59636970013,47.4056056999 L 8.594696201,47.3992081997 L 8.59148070168,47.3970187004 L 8.58817119975,47.3957447 L 8.58434520098,47.3918312003 L 8.58305470172,47.3890491999 L 8.58436120054,47.3883292 L 8.5851962018,47.3903277 L 8.5869666988,47.3901596998 L 8.59307370002,47.3856432004 L 8.59720520051,47.3784982 L 8.60363484106,47.3704819749 L 8.61199420077,47.3669092004 L 8.62166820051,47.3599741997 L 8.62479970189,47.3543931997 L 8.6220897007,47.3534011997 L 8.6051131999,47.3536037 L 8.58677770185,47.3528596999 L 8.5864354184,47.352688935 L 8.58451057778,47.3517285918 L 8.56990702022,47.3467946792 L 8.56574596328,47.3467780497 L 8.56250970122,47.3456917005 L 8.56149420162,47.3473436998 L 8.55820569994,47.3507997 L 8.5563412017,47.3521042001 L 8.55120469974,47.3526537002 L 8.55048569934,47.3533747 L 8.54713070159,47.3588337002 L 8.545538288,47.3639849772 L 8.53955331661,47.364938249 L 8.53629265028,47.3521106463 L 8.54340170119,47.3360942003 L 8.53623520009,47.3301472003 L 8.53265970044,47.3278657003 L 8.52974519821,47.3260612002 L 8.52375020113,47.3242416997 L 8.50311869945,47.3202287003 L 8.50062969947,47.3226967 L 8.50062369878,47.3311082 L 8.50116369968,47.3352741999 L 8.50202970008,47.3365517003 L 8.4976237013,47.3420911997 L 8.48689120125,47.3526882002 L 8.48202213556,47.3593225016 L 8.47101420108,47.3615112002 L 8.46426969911,47.3667376998 L 8.46415819949,47.3683702 L 8.46985165658,47.3739106058 L 8.4675942019,47.3735771996 L 8.45054820109,47.3783301999 L 8.44803320059,47.3801996998 L 8.45758720155,47.3836557003 L 8.46734620155,47.3930667002 L 8.46706556474,47.4018697238 L 8.47813170148,47.4038737001 L 8.47724070188,47.4119377002 L 8.4739331988,47.4122277002 L 8.4727222,47.4106867 L 8.46807869985,47.4133762002 L 8.46919819812,47.4169086996 L 8.47785369948,47.4221192001 L 8.48175570114,47.4230121998 L 8.48853519976,47.4226992001 L 8.49021440629,47.4225576846 L 8.48474119998,47.4281771996 L 8.48547469967,47.4310416998 L 8.50146770072,47.4348717001 L 8.51073070064,47.4321856996 z\" /></g></g></svg>"
      ],
      "text/plain": [
       "<shapely.geometry.multipolygon.MultiPolygon at 0x10ba64ad0>"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Register a Temporary Table to allow us to call SQL-like statements\n",
    "# in Apache Spark against the Points of Interests SparkSQL Dataframe:\n",
    "\n",
    "# pois_df.createOrReplaceTempView(\"pois\") # spark 2.0 version\n",
    "pois_df.registerTempTable(\"pois\")\n",
    "\n",
    "# Select all museums:\n",
    "museums = sqlContext.sql(\n",
    "    \"SELECT geometry, \\\n",
    "    properties.name, \\\n",
    "    properties.tourism \\\n",
    "    FROM pois WHERE properties.tourism = 'museum'\")\n",
    "\n",
    "# lets see how many museums we have in our POI dataset\n",
    "print museums.count()\n",
    "\n",
    "# Select Zurich from boundaries dataframe \n",
    "\n",
    "zurich = sqlContext.sql(\"SELECT wkt, POPEU2013 \\\n",
    "    FROM boundaries WHERE NAME = 'Zurich'\")\n",
    "\n",
    "# get the geographic boundary of Zurich as GeoJSON:\n",
    "json_str = zurich.toJSON().take(1)[0]\n",
    "json_obj = json.loads(json_str)\n",
    "zurich_boundry = wkt.loads(json_obj['wkt'])\n",
    "zurich_boundry"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "museums.rdd class name:  RDD\n",
      "museums_in_zurich class name:  PipelinedRDD\n",
      "museums_in_zurich.collect() class name:  list\n",
      "count of museums in zurich:  25\n"
     ]
    }
   ],
   "source": [
    "# filter museum rdd by performing a within operation\n",
    "museums_in_zurich = museums.rdd.filter(\n",
    "    lambda r: Point(r['geometry']['coordinates']).intersects(zurich_boundry)\n",
    ")\n",
    "\n",
    "print \"museums.rdd class name: \", museums.rdd.__class__.__name__\n",
    "print \"museums_in_zurich class name: \", museums_in_zurich.__class__.__name__\n",
    "print \"museums_in_zurich.collect() class name: \", museums_in_zurich.collect().__class__.__name__\n",
    "print \"count of museums in zurich: \", museums_in_zurich.count()\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Filtering by Spatial Predicate\n",
    "\n",
    "Notice how we call `museums.rdd.filter` on the Spark Dataframe. This requires a bit of explaining. \n",
    "\n",
    "Firstly the call to [`Dataframe.rdd`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.rdd) returns an RDD representing the Dataframe. We then call filter on the RDD with a lambda function that will filter each item by weather or not it lies inside the Zurich boundary.\n",
    "\n",
    "As we know RDD `transformations` are lazily evaluated when an `action` is performed. Notice how when we go to print the filtered value, it prints out that it is an RDD and that the type is a PipelinedRDD not a list of values as we might expect. That's because we haven't performed an `action` yet, we've only performed a `transformation` by calling `filter`. The `action` is only performed when we call `collect()` or `count()` which are both actions."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+-------+\n",
      "|            geometry|                name|tourism|\n",
      "+--------------------+--------------------+-------+\n",
      "|[WrappedArray(8.5...|Ortsmuseum Wollis...| museum|\n",
      "|[WrappedArray(8.5...|      Kriminalmuseum| museum|\n",
      "|[WrappedArray(8.5...|Museum für Gestal...| museum|\n",
      "|[WrappedArray(8.5...|Museum Porzellan ...| museum|\n",
      "|[WrappedArray(8.5...|   Zivilschutzmuseum| museum|\n",
      "|[WrappedArray(8.5...|    Ortsmuseum Höngg| museum|\n",
      "|[WrappedArray(8.5...|     Museum Rietberg| museum|\n",
      "|[WrappedArray(8.5...|Museum der Zeitme...| museum|\n",
      "|[WrappedArray(8.5...|  Gasthaus zum Bären| museum|\n",
      "|[WrappedArray(8.4...|Ortsmuseum Altste...| museum|\n",
      "|[WrappedArray(8.5...|    Haus Konstruktiv| museum|\n",
      "|[WrappedArray(8.5...| Tram-Museeum Zürich| museum|\n",
      "|[WrappedArray(8.5...| Museum der Kulturen| museum|\n",
      "|[WrappedArray(8.5...|       Coninx-Museum| museum|\n",
      "|[WrappedArray(8.4...|Ortsmuseum Albisr...| museum|\n",
      "|[WrappedArray(8.5...|Zoologisches und ...| museum|\n",
      "|[WrappedArray(8.5...|Archäologisches M...| museum|\n",
      "|[WrappedArray(8.5...|Ortsmuseum Schwam...| museum|\n",
      "|[WrappedArray(8.5...|         Focus Terra| museum|\n",
      "|[WrappedArray(8.5...|Zürcher Spielzeug...| museum|\n",
      "+--------------------+--------------------+-------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# go from RDD to dataframe and show the results\n",
    "museums_in_zurich_df = sqlContext.createDataFrame(museums_in_zurich)\n",
    "museums_in_zurich_df.show()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.9"
  },
  "toc": {
   "toc_cell": false,
   "toc_number_sections": true,
   "toc_threshold": 6,
   "toc_window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
